package com.cs.test.mq;

import com.cs.test.common.Constant;
import com.cs.test.pojo.MsgLog;
import com.cs.test.service.MsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Map;

/**
 * 不知道大家发现没有, 在MailConsumer中, 真正的业务逻辑其实只是发送邮件mailUtil.send(mail)而已, 但我们又不得不在调用send方法之前校验消费幂等性, 发送后,
 * 还要更新消息状态为"已消费"状态, 并手动ack, 实际项目中, 可能还有很多生产者-消费者的应用场景, 如记录日志, 发送短信等等, 都需要rabbitmq, 如果每次都写这些重复的公用代码,
 * 没必要, 也难以维护, 所以, 我们可以将公共代码抽离出来, 让核心业务逻辑只关心自己的实现, 而不用做其他操作, 其实就是AOP
 * <p>
 * 为达到这个目的, 有很多方法, 可以用spring aop, 可以用拦截器, 可以用静态代理, 也可以用动态代理, 在这里, 我用的是动态代理
 */
@Slf4j
public class BaseConsumerProxy {

    private Object target;

    private MsgLogService msgLogService;

    public BaseConsumerProxy(Object target, MsgLogService msgLogService) {
        this.msgLogService = msgLogService;
        this.target = target;
    }

    public Object getProxy() {
        ClassLoader classLoader = target.getClass().getClassLoader();
        Class<?>[] interfaces = target.getClass().getInterfaces();

        Object proxyInstance = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            Message message = (Message) args[0];
            Channel channel = (Channel) args[1];

            String correlationId = getCorrelationId(message);
            // 消息幂等性, 防止重复消费
            if (isConsumed(correlationId)) {
                log.info("重复消费, correlationId:{}", correlationId);
                return null;
            }

            MessageProperties properties = message.getMessageProperties();
            long tag = properties.getDeliveryTag();

            try {
                // 真正消费消息的业务
                Object result = method.invoke(target, args);
                msgLogService.updateStatus(correlationId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
                channel.basicAck(tag, false);
                return result;
            } catch (IllegalAccessException e) {
                log.error("getProxy error", e);
                channel.basicNack(tag, false, true);
                return null;
            } catch (IllegalArgumentException e) {
                log.error("getProxy error", e);
                channel.basicNack(tag, false, true);
                return null;
            } catch (InvocationTargetException e) {
                log.error("getProxy error", e);
                channel.basicNack(tag, false, true);
                return null;
            } catch (IOException e) {
                log.error("getProxy error", e);
                channel.basicNack(tag, false, true);
                return null;
            }
        });
        return proxyInstance;
    }

    /**
     * 获取CorrelationId
     *
     * @param message
     * @return
     */
    private String getCorrelationId(Message message){
        String correlationId = null;
        MessageProperties properties = message.getMessageProperties();
        Map<String, Object> headers = properties.getHeaders();
        for (Map.Entry<String, Object> entry : headers.entrySet()) {
            String key = entry.getKey();
            String value = (String) entry.getValue();
            if (key.equals("spring_returned_message_correlation")){
                correlationId = value;
            }
        }
        return correlationId;
    }

    /**
     * 消息是否被消费
     * @param correlationId
     * @return
     */
    private boolean isConsumed(String correlationId){
        MsgLog msgLog = msgLogService.selectByMsgId(correlationId);
        if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)){
            return true;
        }
        return false;
    }
}
